查看原文
其他

消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

素文宅博客 Java精选 2022-08-09


消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的消息模式,另一种是订阅\发布的消息模式



点对点的消息模式

点对点的模式主要建立在一个队列上,当连接一个列队时,发送方不需要知道接收方是否正在接收消息,可以直接向ActiveMQ发送消息,而发送的消息将直接进入队列中,如果接收方启动着监听,则会向接收方发送消息,若接收方没有接收到消息,则会保存在ActiveMQ服务器中,直到接收方接收消息为止。点对点的消息模式可以有多个接收方和发送方,但是一条消息只会被一个接收方接收到,先连上ActiveMQ接收方,则会先接收到消息,而之后的接收方则接收不到已被接收过的消息。


Java实现ActiveMQ点对点模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:

<dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-core</artifactId>

    <version>5.7.0</version>

</dependency>


点对点的发送方逻辑代码

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


public class MQSender {

    

    private String userName = "root";

    private String password = "123456";

    private String url = "tcp://127.0.0.1:61616";

    

    public static void main(String[] args) {

        MQSender send = new MQSender();

        send.start();

    }

    

    public void start(){

        try {

            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);

            Connection connection = factory.createConnection();

            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建

            Destination destination = session.createQueue("textMsg");

            MessageProducer producer = session.createProducer(destination);

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            TextMessage textMsg = session.createTextMessage("消息内容");

            for(int i = 0 ; i < 10; i ++){

                producer.send(textMsg);

            }

            producer.close();

            

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

}


点对点的接收方代码

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


public class PTPReceive {


    private String userName = "root";

    private String password = "123456";

    private String url = "tcp://127.0.0.1:61616";

    public static void main(String[] args) {

        PTPReceive receive = new PTPReceive();

        receive.start();

    }

    

    public void start(){

        try {

            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);

            Connection connection = factory.createConnection();

            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createQueue("textMsg");

            MessageConsumer consumer = session.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {

                @Override

                public void onMessage(Message message) {

                    try {

                        String text = ((TextMessage)message).getText();

                    } catch (JMSException e) {

                        e.printStackTrace();

                    }

                }

            });

            consumer.close();

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

}


订阅/发布的消息模式

订阅/发布模式有多个接收方和发送方,但是接收方与发送方存在时间上的依赖,如果发送方发送消息时接收方没有监听消息,那么ActiveMQ将不会保存该消息,认为消息已经发送。这个模式还有一个特点就是发送方发送的消息会被所有的接收方接收到,与点对点模式恰恰相反。


Java实现ActiveMQ订阅/发布模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:

<dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-core</artifactId>

    <version>5.7.0</version>

</dependency>


订阅/发布的发送方代码

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


public class MQSender {

    

    private String userName = "root";

    private String password = "123456";

    private String url = "tcp://127.0.0.1:61616";

    

    public static void main(String[] args) {

        MQSender send = new MQSender();

        send.start();

    }

    

    public void start(){

        try {

            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);

            Connection connection = factory.createConnection();

            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建

            Destination destination = session.createTopic("textMsg");

            MessageProducer producer = session.createProducer(destination);

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            TextMessage textMsg = session.createTextMessage("消息内容");

            for(int i = 0 ; i < 10; i ++){

                producer.send(textMsg);

            }

            producer.close();

            

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

}


订阅/发布的接收方代码

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


public class PTPReceive {


    private String userName = "root";

    private String password = "123456";

    private String url = "tcp://127.0.0.1:61616";

    public static void main(String[] args) {

        PTPReceive receive = new PTPReceive();

        receive.start();

    }

    

    public void start(){

        try {

            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);

            Connection connection = factory.createConnection();

            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createTopic("textMsg");

            MessageConsumer consumer = session.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {

                @Override

                public void onMessage(Message message) {

                    try {

                        String text = ((TextMessage)message).getText();

                    } catch (JMSException e) {

                        e.printStackTrace();

                    }

                }

            });

            consumer.close();

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

}


推荐阅读

 

Java面试高级篇—Java NIO:浅析I/O模型面试题15期

Java面试高级篇—详谈Java四种线程池及new Thread的弊端面试题14期

Java面试高级篇—说说TCP,UDP和socket,Http之间联系和区别面试题12期

Java面试高级篇—Session和Cookie的区别与联系面试题12期

Java面试高级篇—Hash冲突怎么办,哪些解决散列冲突的方法?面试题11期

Java面试高级篇—Java中的队列都有哪些,有什么区别面试题10期

Java面试高级篇—HashMap源码实现原理及底层结构面试题9期


更多推荐↓↓↓
 

关注微信公众号“Java精选”(w_z90110),回复关键字领取资料:如HadoopDubboCAS源码等等,免费领取资料视频和项目。 


涵盖:程序人生、搞笑视频、算法与数据结构、黑客技术与网络安全、前端开发、Java、Python、Redis缓存、Spring源码、各大主流框架、Web开发、大数据技术、Storm、Hadoop、MapReduce、Spark、elasticsearch、单点登录统一认证、分布式框架、集群、安卓开发、iOS开发、C/C++、.NET、Linux、Mysql、Oracle、NoSQL非关系型数据库、运维等。

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存